package com.ruoyi.ghxx.mqtt;

import com.ruoyi.ghxx.common.MsgQueue;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Date;

/**
 * 接收消息回调函数
 */
@Slf4j
public class MqttRecieveCallback implements MqttCallback {

    /**
     * 断连后重新连接并订阅相关主题
     * @param cause
     */
    @Override
    public void connectionLost(Throwable cause) {
        log.error(cause.getMessage());
        MyMqttClient client = MyMqttClient.getInstance();
        client.connect();

        //client.subTopic("topic主题");
    }

    /**
     * 消费消息
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {

        try {
            int qos = message.getQos();
            String msg = new String(message.getPayload(),"UTF-8");
            MsgQueue msgQueue=new MsgQueue();
            msgQueue.setQos(qos);
            msgQueue.setReporteTime(new Date());
            msgQueue.setMessage(msg);
            msgQueue.setTopic(topic);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
//        System.out.println("delivery complete");
//        System.out.println(token.isComplete());
    }
}

